Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding node_id to ExecutionPlanProperties #12186

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ameyc
Copy link
Contributor

@ameyc ameyc commented Aug 27, 2024

Which issue does this PR close?

Closes #11364

Rationale for this change

Currently ExecutionPlans dont have an identifier associated with them, making it hard to distinguish between the nodes for
usecases such as snapshotting continuous pipelines, displaying node metrics in a UI etc.

What changes are included in this PR?

Changes to -

  1. ExecutionPlanProperties to add node_id Option<usize>
  2. ExecutionPlan to add with_node_id() method to return a copy of the ExecutionPlan with assigned node id.
  3. Changes to SessionState to add node_id annotation to finalized physical plans.
  4. Utils in physical-plan/src/node_id.rs to traverse ExecutionPlans and generate deterministic ids for the whole tree.

Are these changes tested?

Added asserts to an existing test in datafusion-examples/src/planner_api.rs.

Are there any user-facing changes?

No

…notation to combined create_physical_plan API
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Aug 27, 2024
@ozankabak
Copy link
Contributor

ozankabak commented Aug 27, 2024

We had similar challenges when using DataFusion in a context similar to yours (checkpointing etc.) I have consulted with my team on how we solved them, and discussed this general approach. I can share the following general thoughts/concerns:

  • In some use cases, one wants an ID for every node the plan tree (e.g. for display/UI purposes). In others, what is actually necessary is an ID per stream (any kind of stateful work).
  • Having a default None value for IDs is potentially problematic (apparently we ran into bugs caused by this after initially trying such an approach).
  • One may want different types for the ID in different use cases (usize may not be appropriate for all)

Therefore, we think the right approach is to traverse the tree (for plans or streams, depending on your use case), generate the IDs as you see fit, and store it downstream in a map container that associates nodes with your IDs. I don't think doing this upstream inside ExecutionPlanProperties is the right solution. This is also somewhat evident when you look at what is inside: Execution mode, ordering information, partitioning information, and equivalence information -- it is used as a cache to store derived properties of the plan object itself that emerge from the plan tree (not set externally)

Apologies for not being able to provide feedback and start the detailed discussion before you prepared a PR, but in my defense you were too fast :)

I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well

@ameyc
Copy link
Contributor Author

ameyc commented Aug 30, 2024

I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well

Thanks for the feedback @ozankabak . Would it make sense to add this traversal code in the utils? I would be nice for this to be available in the core library itself.

@ozankabak
Copy link
Contributor

Let's create a new example file, add the traversal logic and a simple of demonstration of how to store the node <-> id association via a map in that file. I think it will be a fairly succinct yet guiding example for many others.

You may run into a difficulty creating or inserting into a map with Arc<dyn ExecutionPlan> as a key. A good way to get around it is to put the plan inside a wrapper struct and using that as a key.

We wanted to make Arc<dyn ExecutionPlan> directly useable as keys for a while, but couldn't get around to doing it. It would be a great contribution if you'd like to work on it. It will make use cases like this smoother to code, and also help make some planning algorithms faster.

@ameyc
Copy link
Contributor Author

ameyc commented Sep 11, 2024

Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle.

While we are interested in using the node_id for stateful checkpointing, it does make sense for this to be on an ExecutionPlan since it is a node on PhysicalPlan graph.

In some use cases, one wants an ID for every node the plan tree (e.g. for display/UI purposes). In others, what is actually necessary is an ID per stream (any kind of stateful work).

Streams can easily, derive their id "{node_id}_{stream_id}"

Having a default None value for IDs is potentially problematic (apparently we ran into bugs caused by this after initially trying such an approach).

Presumably for any user interested in using the "node_id", they'd make their operators implement "with_node_id".

One may want different types for the ID in different use cases (usize may not be appropriate for all).

For additional types of node_ids, I suppose one could maintain them in a HashMap<usize, T> where usize is the node_id and not a ptr to Arc

@ozankabak
Copy link
Contributor

ozankabak commented Sep 11, 2024

Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle.

Can you share a link to your implementation attempt? This is a little surprising, I'd like to go over it and understand what is going on.

@ameyc
Copy link
Contributor Author

ameyc commented Sep 12, 2024

@ozankabak would love any pointers. this code is admittedly a quick draft. so we pass in a hashmap and recursivesly traverse the tree.

pub struct NodeIdAnnotator {
    next_id: usize,
}

impl NodeIdAnnotator {
    pub fn new() -> Self {
        NodeIdAnnotator { next_id: 0 }
    }

    pub fn next_node_id(&mut self) -> usize {
        let node_id = self.next_id;
        self.next_id += 1;
        node_id
    }
}

pub fn annotate_node_id_for_execution_plan(
    plan: &Arc<dyn ExecutionPlan>,
    annotator: &mut NodeIdAnnotator,
    plan_map: &mut HashMap<usize, usize>,
) {
    for child in plan.children() {
        annotate_node_id_for_execution_plan(child, annotator, plan_map);
    }
    let node_id = annotator.next_node_id();
    let addr = Arc::as_ptr(plan) as *const () as usize;
    plan_map.insert(addr, node_id);
}

then when executing the plan, we need to actually annotate this and set a global hash map singleton.

    pub async fn print_stream(self) -> Result<()> {
        let plan = self.df.as_ref().clone().create_physical_plan().await?;
        let mut node_id_map: HashMap<usize, usize> = HashMap::new();
        let mut annotator = NodeIdAnnotator::new();
        annotate_node_id_for_execution_plan(&plan, &mut annotator, &mut node_id_map);
        for (key, value) in node_id_map.iter() {
            debug!("Node {}, Id {}", key, value);
        }
        let task_ctx = self.df.as_ref().clone().task_ctx();
        set_global_node_id_hash_map(&node_id_map);
        let mut stream: SendableRecordBatchStream = execute_stream(plan, Arc::new(task_ctx))?;

now when creating a Stream, we do need to pass a reference to the ExecutionPlan it is tied to in order for it to figure out the channel_tag/checkpoint_tag it is supposed to use to coordinate checkpoints.

Btw the annotation outputted is something like -

[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783968320, Id 0
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783975296, Id 1
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783964912, Id 2
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783965296, Id 3

In contrast this PR annotates the node_ids during create_physical_plan phase and then you have the node_id at Stream creation time without needing a global lookup. The corresponding print_physical_plan() now gives you the node id as well.

FilterExec: max@3 > 113, node_id=3
  StreamingWindowExec: mode=Single, gby=[...], aggr=[..], window_type=[..], node_id=2
    RepartitionExec: partitioning=Hash([sensor_name@2], 14), input_partitions=1, node_id=1
      DenormalizedStreamingTableExec: partition_sizes=1, projection=[..], infinite_source=true, node_id=0

@emgeee
Copy link
Contributor

emgeee commented Sep 13, 2024

Just chiming in -- the implementation in this PR seems quite reasonable to me. While there are definitely ways to hack around the limitation of not having Node IDs, those strategies would be quite vulnerable to upstream breaking changes and given datafusion's goal of being extensible it makes makes sense for these to be a core a feature of the library.

@ozankabak
Copy link
Contributor

ozankabak commented Sep 21, 2024

@ozankabak would love any pointers. this code is admittedly a quick draft. so we pass in a hashmap and recursivesly traverse the tree.

Thanks for the sketch, I see the challenge. After reviewing your sketch and going back and inspecting our solution in our fork, it seems to me like it may indeed make sense to add this functionality natively to upstream DF. That being said, I am worried about a few things (default value being None, placement of a non-derived attribute living in PlanProperties etc.), but I think we can find a design that mitigates my concerns.

Next week, I (or maybe @berkaysynnada) will work on a draft PR to flesh out some ideas on our end, then we can compare/contrast and arrive at a final design together. Then we can get some extra community review and merge this if noone else has other concerns or objections (whether this functionality should belong to core etc.)

Thanks for awesome collaboration.

@ozankabak
Copy link
Contributor

Just wanted to leave a note to stress that I haven't forgotten about this -- this is on our shortlist to focus on after resolving some urgent issues.

@ameyc
Copy link
Contributor Author

ameyc commented Oct 3, 2024

Just wanted to leave a note to stress that I haven't forgotten about this -- this is on our shortlist to focus on after resolving some urgent issues.

thanks for taking the time. looking forward to continuing the discussion.

@emgeee
Copy link
Contributor

emgeee commented Oct 28, 2024

@ozankabak have you had a chance to take a look at this yet?

@ozankabak
Copy link
Contributor

It is on my agenda for this week. I have been traveling causing a delay on this project (and some others).

I remain convinced some version of this functionality belongs to the core, so rest assured we will get it through the finish line sooner or later (unless someone else raises a reasonable objection that eludes me)

@ozankabak
Copy link
Contributor

Going through our implementation I see that we were able to get around the non-existing ID problem by using string IDs and an auto-generation logic for non-leaf nodes.

I will probe further to get a good understanding of how we can arrive at a safe and extensible design and circle back. Probably @berkaysynnada will join the collaboration as well. Thanks for your patience.

@emgeee
Copy link
Contributor

emgeee commented Dec 4, 2024

Hey @ozankabak have you gotten a chance to revisit this yet?

@ozankabak
Copy link
Contributor

Sorry, some other priorities kept me busy and this slipped through the cracks. @berkaysynnada let's prioritize this for next week.

@berkaysynnada
Copy link
Contributor

Apologies for the delay in responding. I have started working on this issue and will open a draft PR to facilitate discussion, FYI @emgeee. I plan to share it today or tomorrow.

@berkaysynnada
Copy link
Contributor

Hi @emgeee and @ameyc. I apologize again for the delay in providing feedback, but I believe we can iterate quickly now and align on the best design to suit every use case.

Here is the approach we use, and I believe it could address your concerns while minimizing changes to the existing API's:

We introduce generate_id and with_id methods to the ExecutionPlan trait:

trait ExecutionPlan {
    ...
    fn generate_id(&self) -> Result<String> {
        let children = self.children();
        match children.as_slice() {
            [] => exec_err!("Source operator {:?} should implement the generate_id method", self.type_name()),
            [child] => {
                let child_id = child.generate_id()?;
                Ok(format!("[{child_id}]"))
            }
            children => children
                .iter()
                .map(|child| child.generate_id())
                .collect::<Result<Vec<_>>>()
                .map(|ids| {
                    let result = ids.join(", ");
                    format!("[{result}]")
                }),
        }
    }

    fn with_id(self: Arc<Self>, id: String) -> Result<Option<Arc<dyn ExecutionPlan>>> {
        if self.children().is_empty() {
            not_impl_err!("Source operators must implement with_id() method")
        } else {
            Ok(None)
        }
    }
    ...
}

Source operators, such as CsvExec, implement these methods:

impl ExecutionPlan for CsvExec {
    ...
    fn generate_id(&self) -> Result<String> {
        let Some(id) = self.id.as_ref() else {
            return exec_err!("Source ID is not set");
        };
        Ok(id.clone())
    }

    fn with_id(self: Arc<Self>, id: String) -> Result<Option<Arc<dyn ExecutionPlan>>> {
        let mut exec = self.as_ref().clone();
        exec.id = Some(id);
        Ok(Some(Arc::new(exec)))
    }
    ...
}

One significant advantage of this approach is that it introduces minimal API changes, focusing only on source operators. It also provides flexible usage through customizable nesting logic for ID's. For instance, relationships between partitioned inputs or child operators can be refined to align with use case needs.

A potential limitation is the use of string for ID's. However, this choice offers flexibility for cases where human-readable identifiers are preferred. Additionally, it makes the child-parent relationships self-explanatory, which is beneficial for debugging and visualization purposes.

In our use case, we construct a parallel tree to enrich operator metadata, which is for snapshotting. This secondary tree is built based on these ID's and is designed to remain isolated from the original execution plan. We have utility functions that streamline this process, and we’d be happy to contribute them upstream if needed.

If this approach aligns with upstream requirements, I’d be glad to collaborate on refining and integrating it, along with the necessary utilities. Let me know if this sounds like a viable direction to proceed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Deterministic IDs for ExecutionPlan
4 participants